package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"github.com/gogf/gf/util/gconv"
	"log"
	"strings"
	"time"
)

func main() {
	topic := "my-topic123" // TODO 测试用的topic...
	brokers := []string{"localhost:9092"}

	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
	config.Consumer.Offsets.Initial = sarama.OffsetNewest // Notice 初始化的时候从最新的数据开始
	config.Consumer.Return.Errors = true

	consumer, err := sarama.NewConsumer(brokers, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %v\n", err)
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %v\n", err)
		}
	}()

	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) // Notice 初始化的时候从最新的数据开始
	if err != nil {
		log.Fatalf("Error creating partition consumer: %v\n", err)
	}
	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Fatalf("Error closing partition consumer: %v\n", err)
		}
	}()

	for {
		select {
		case msg := <-partitionConsumer.Messages():

			// TODO 可以处理业务逻辑...
			fmt.Printf("Received message: partition: %v, offset: %v, value: %v\n", msg.Partition, msg.Offset, gconv.String(msg.Value))

		case err1 := <-partitionConsumer.Errors():
			log.Printf("Error receiving message: %v\n", err1)
			// Handle network errors by attempting to reconnect
			// Notice 处理网络异常......
			fmt.Println("err.Err:>>>>> ", err1.Err)

			// TODO 下面这样判断有问题～～
			//if err1.Err == sarama.ErrOutOfBrokers || err1.Err == sarama.ErrBrokerNotFound {
			//	log.Println("Network error, reconnecting...!!!!!!!!!!!!!!!!!!!!")
			//	time.Sleep(5 * time.Second) // Wait before reconnecting

			// Notice 哪些情况需要重试 这里需要枚举出来～～
			if strings.Contains(err1.Error(), "client has run out of available brokers to talk to") ||
				strings.Contains(err1.Error(), "broker for ID is not found") ||
				strings.Contains(err1.Error(), "EOF") {

				log.Println("Network error, reconnecting...!!!!!!!!!!!!!!!!!!!!")
				time.Sleep(5 * time.Second) // Wait before reconnecting
				continue
			} else {
				// TODO 其他的异常... 可退出...
				// Notice 如果实际上这种情况比较少，把if 跟 else的条件换一下～
				fmt.Println("其他异常 可退出...")
				return
			}

		}
	}

}
